package com.dbflow5.rx2.query;

import com.dbflow5.BaseUnitTest;
import com.dbflow5.config.FlowManager;
import com.dbflow5.database.DatabaseWrapper;
import com.dbflow5.models.SimpleModel_Table;
import com.dbflow5.models.SimpleTestModels.SimpleModel;
import com.dbflow5.query.ModelQueriable;
import com.dbflow5.query.SQLite;
import com.dbflow5.reactivestreams.query.ModelQueriableExtensions;
import com.dbflow5.reactivestreams.transaction.TransactionObservable;
import com.dbflow5.structure.Model;
import com.dbflow5.transaction.ITransaction;

import io.reactivex.rxjava3.core.Flowable;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import static org.junit.Assert.assertEquals;

public class CursorResultSubscriberTest extends BaseUnitTest {
    @Test
    public void testCanQueryStreamResults() {
        FlowManager.databaseForTable(SimpleModel.class, db -> {
            for(int i = 0; i <= 9; i++) {
                Model.save(SimpleModel.class, new SimpleModel(String.valueOf(i)), db);
            }

            AtomicInteger count = new AtomicInteger();
            ModelQueriableExtensions.queryStreamResults(SQLite.select().from(SimpleModel.class), db)
                    .subscribe(simpleModel -> {
                        count.getAndIncrement();
                        assert(simpleModel != null);
                    });

            assertEquals(10, count.get());
            return null;
        });
    }

    @Test
    public void testCanObserveOnTableChangesWithModelOps() {
        AtomicInteger count = new AtomicInteger();

        TransactionObservable.asFlowable(SQLite.select().from(SimpleModel.class), (BiFunction<ModelQueriable<SimpleModel>, DatabaseWrapper, Object>) ModelQueriable::queryList).subscribe(simpleModel -> {
            count.getAndIncrement();
        });
        SimpleModel model = new SimpleModel("test");

        FlowManager.databaseForTable(SimpleModel.class, dbFlowDatabase -> null).executeTransactionFunc(db -> {
            Model.save(SimpleModel.class, model, db);
            Model.delete(SimpleModel.class, model, db);
            return Model.insert(SimpleModel.class,model,db);
        });
        assertEquals(2, count.get());
    }

    @Test
    public void testCanObserveOnTableChangesWithTableOps() {
        FlowManager.databaseForTable(SimpleModel.class, dbFlowDatabase -> {
            SQLite.delete(SimpleModel.class).executeUpdateDelete(dbFlowDatabase);

            AtomicInteger count = new AtomicInteger();

            AtomicReference<List<SimpleModel>> curList = new AtomicReference<>(new ArrayList<>());

            Flowable<List<SimpleModel>> flowable = TransactionObservable.asFlowable(SQLite.select().from(SimpleModel.class), new BiFunction<ModelQueriable<SimpleModel>, DatabaseWrapper, List<SimpleModel>>() {
                @Override
                public List<SimpleModel> apply(ModelQueriable<SimpleModel> modelModelQueriable, DatabaseWrapper databaseWrapper) {
                    return modelModelQueriable.queryList(databaseWrapper);
                }
            });
            flowable.subscribe(simpleModels -> {
                count.getAndIncrement();
                curList.set(simpleModels);
            });
            dbFlowDatabase.executeTransactionFunc(db -> {
                SQLite.insert(SimpleModel.class, SimpleModel_Table.name).values("test").executeInsert(db);
                SQLite.insert(SimpleModel.class, SimpleModel_Table.name).values("test1").executeInsert(db);
                return SQLite.insert(SimpleModel.class, SimpleModel_Table.name).values("test2").executeInsert(db);
            });

            assertEquals(3, curList.get().size());

            dbFlowDatabase.executeTransactionFunc(databaseWrapper -> {
                SimpleModel model = SQLite.select().from(SimpleModel.class).where(SimpleModel_Table.name.eq("test")).requireSingle(databaseWrapper);
                return Model.delete(SimpleModel.class, model, databaseWrapper);
            });

            dbFlowDatabase.tableObserver().checkForTableUpdates();
            assertEquals(2, curList.get().size());
            assertEquals(3, count.get());
            return null;
        });
    }
}
